package com.atguigu.flink.sql.function;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

import static org.apache.flink.table.api.Expressions.$;

/**
 * Created by Smexy on 2023/11/20

 表生成聚合函数本质是表生成函数，只是输出的结果需要聚合运算后再输出

 示例: 统计每种传感器的最大vc的前两位.
    每来一条数据，输出2行2列(先聚合)。

    输入:           myUdtaf( vc ) 输出:
    s1,1,100               first,100
                           second,90
 */
public class Demo5_UDTAFFunctionCustom
{
    public static void main(String[] args) {

        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);

        String createTableSql = "CREATE TABLE t1 (" +
            "  id STRING," +
            "  ts BIGINT," +
            "  vc INT " +
            ")  WITH (" +
            "  'connector' = 'filesystem',   " +
            "  'path' = 'data/ws.json', " +
            "  'format' = 'json'             " +
            ")";

        tableEnv.executeSql(createTableSql);

        //sql中没法用，flink使用的sql解析器不支持解析这种语法。只有TableAPI的方式
        Table t1 = tableEnv.from("t1");

        MyTop2MaxVc myFunc = new MyTop2MaxVc();
        t1
            .groupBy($("id"))
            .flatAggregate(Expressions.call(myFunc,$("vc")))
            .select($("id"),$("rank"),$("value"))
            .execute()
            .print();

    }

    @FunctionHint(output = @DataTypeHint("Row<rank STRING,value INT>"))
    public static class MyTop2MaxVc extends TableAggregateFunction<Row,MyTop2Vc>{

        @Override
        public MyTop2Vc createAccumulator() {
            return new MyTop2Vc(0,0);
        }

       public void accumulate(MyTop2Vc accumulator ,Integer vc){
            //和当前的前2名比较
           if (vc > accumulator.firstVc){
               //把之前的第一名降为第二名
               accumulator.secondVc = accumulator.firstVc;
               //把当前vc提升为第一名
               accumulator.firstVc = vc;
           }else if (vc > accumulator.secondVc){
               //把当前vc赋值为第二名
               accumulator.secondVc = vc;
           }
       }

       //输出结果
       public void emitValue(MyTop2Vc acc, Collector<Row> out){
            //每次输出1-2行
           if (acc.secondVc > 0){
               out.collect(Row.of("second",acc.secondVc));
           }
           out.collect(Row.of("frist",acc.firstVc));
       }
    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class MyTop2Vc {
        private Integer firstVc;
        private Integer secondVc;
    }
}
